use super::status::*;
use crate::event::{Event, Scheduler};
use crate::runtime::{
    ActiveWorker, ArcGroup, Attr, FdSet, RawTaskContext, TaskContext, TaskRef, TaskWaker, Worker,
};
use crate::Result;
use core::alloc::Layout;
use core::mem::ManuallyDrop;
use core::ptr::{self, NonNull};
use core::sync::atomic::Ordering::Relaxed;
use core::task::{Context, Poll, Waker};
use hioff::container_of_mut;
use hipool::{Boxed, MemPool, NullAlloc};

pub(crate) type TaskPool = Boxed<'static, MemPool, NullAlloc>;

#[repr(C)]
pub struct RawTask {
    pub(crate) status: AtomicStatus,
    pub(crate) next: *mut RawTask,
    pub(crate) vtable: &'static RawTaskVTable,
    pub(crate) event: Event,
    pub(crate) group: ArcGroup,
    pub(crate) private: *const (),
    pub(crate) fdset: FdSet<'static>,
}

impl RawTask {
    pub(crate) fn new(vtable: &'static RawTaskVTable, attr: &Attr, group: ArcGroup) -> Self {
        Self {
            next: ptr::null_mut(),
            vtable,
            event: Event::new(Self::event_handle),
            status: AtomicStatus::new(attr),
            group,
            private: ptr::null(),
            fdset: FdSet::new(attr.fdset.into()),
        }
    }

    pub(crate) fn inc_ref(&self) {
        self.status.inc_ref();
    }

    pub(crate) fn dec_ref(&self) {
        let _ = self.dec_ref_inline();
    }

    #[inline(always)]
    fn dec_ref_inline(&self) -> Option<TaskPool> {
        if self.status.test_dec_ref() {
            let this = unsafe { core::ptr::NonNull::from(self).as_mut() };
            return Some(unsafe { this.release() });
        }
        None
    }

    pub(crate) fn task_ref(&self) -> TaskRef {
        self.inc_ref();
        TaskRef::new(NonNull::from(self))
    }

    pub(crate) fn fast_wake_by_ref(&self, worker: &mut ActiveWorker) {
        if self.status.test_inc_wake() {
            worker.sched_local(self.task_ref());
        }
    }
}

impl TaskRef {
    pub(crate) fn fast_wake(self, worker: &mut ActiveWorker) {
        if self.status.test_inc_wake() {
            worker.sched_local(self);
        } else {
            self.cache(worker);
        }
    }
    pub(crate) fn cache(self, worker: &mut ActiveWorker) {
        let this = ManuallyDrop::new(self);
        if let Some(task_pool) = this.dec_ref_inline() {
            worker.cache_task_pool(task_pool);
        }
    }
}

impl RawTask {
    pub(crate) fn is_finished(&self) -> bool {
        let status = self.status.status(Relaxed);
        status >= STAT_FINISH
    }

    pub(crate) fn abort(&self) -> bool {
        if self
            .status
            .cmp_xchg_status(STAT_RUN, STAT_ABORT, Relaxed)
            .is_ok()
        {
            self.wake();
            return true;
        }
        false
    }

    pub(crate) fn wake(&self) {
        //可能被多次唤醒，也可能正在运行中，确保只在首次唤醒的时候才真正执行调度
        if self.status.test_inc_wake() {
            let local = self.status.get_local();
            if let Some(current) = ActiveWorker::current() {
                if current.in_group(&self.group) {
                    return current.wake(self.task_ref(), local);
                }
            }
            self.group.sched(self.task_ref(), local);
        }
    }

    #[inline(always)]
    fn run_continue(&self) -> bool {
        let status = self.status.status(Relaxed);
        if status < STAT_FINISH {
            true
        } else {
            assert!(status == STAT_ABORT || status == STAT_EXIT);
            false
        }
    }

    fn event_handle(event: &Event, events: u32, flags: u16, sched: &mut dyn Scheduler) {
        let this = unsafe { &mut *container_of_mut!(event, Self, event) };
        if events == 0 {
            // 如果是fd_event, events一定不为0
            // 引用计数在进入队列之前已经增加, 需要消耗引用计数.
            let task = TaskRef::new(NonNull::from(this));
            task.run(unsafe { ActiveWorker::from_sched(sched) });
        } else {
            // 这里不能消耗引用计数. 只有所有fd都取消注册之后才可以.
            if this.fdset.handle_events(events, flags) {
                let worker = unsafe { ActiveWorker::from_sched(sched) };
                this.fast_wake_by_ref(worker);
            }
        }
    }

    fn fdset_close(&mut self, worker: &mut ActiveWorker) {
        self.fdset.exit(worker);
        if self.fdset.fd_capacity() > 0 {
            let result = self.status.test_dec_ref();
            debug_assert!(!result);
        }
    }
}

impl TaskRef {
    pub(crate) fn run(mut self, worker: &mut ActiveWorker) {
        let task_waker = TaskWaker::new(&mut self, worker);
        let mut ctx = Context::from_waker(task_waker.waker());
        if self.run_continue() {
            let cnt = self.status.wake_count();
            self.status.poll_inc();
            match self.poll(&mut ctx) {
                Poll::Ready(_) => {
                    core::mem::drop(task_waker);
                    self.fdset_close(worker);
                    self.cache(worker);
                    return;
                }
                Poll::Pending => {}
            }

            // 执行过程中没有被提前终止，则等待被唤醒
            if !ctx.exited() {
                if self.status.test_dec_wake(cnt) {
                    core::mem::drop(task_waker);
                    worker.sched_local(self);
                }
                return;
            }

            // #23 设置EXIT状态, 通知JoinHandle
            self.exit(STAT_EXIT);
        }
        // 被提前终止. 如果从未被调度过，则直接返回. 如果曾经被调度过，则利用abort机制清理资源.
        if self.status.poll_cnt() > 0 {
            ctx.set_aborted(true);
            let _ = self.poll(&mut ctx);
        }
        core::mem::drop(task_waker);
        self.fdset_close(worker);
        self.cache(worker);
    }
}

#[repr(C)]
pub struct RawTaskVTable {
    pub poll: fn(&mut RawTask, ctx: &mut Context<'_>) -> Poll<()>,
    pub release: unsafe fn(&mut RawTask) -> TaskPool,
    pub output: unsafe fn(&RawTask, waker: &Waker, output: *mut ()) -> Poll<Result<()>>,
    pub join: unsafe fn(&RawTask, output: *mut ()) -> Result<()>,
    pub exit: fn(&mut RawTask, stat: u32),
    pub tail: unsafe fn(&RawTask, layout: Layout) -> *const (),
}

impl RawTask {
    unsafe fn release(&mut self) -> TaskPool {
        (self.vtable.release)(self)
    }

    pub fn poll(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
        (self.vtable.poll)(self, ctx)
    }
    pub fn exit(&mut self, stat: u32) {
        (self.vtable.exit)(self, stat)
    }

    /// # Safety
    /// 调用者保证传递的output的大小和Future::Output大小相同.
    pub unsafe fn output(&self, waker: &Waker, output: *mut ()) -> Poll<Result<()>> {
        (self.vtable.output)(self, waker, output)
    }
    /// # Safety
    /// 调用者保证传递的output的大小和Future::Output大小相同.
    pub unsafe fn join(&self, output: *mut ()) -> Result<()> {
        (self.vtable.join)(self, output)
    }

    /// # Safety
    /// 调用者保证传递的layout和task创建时attr的tail设置的layout相同.
    pub unsafe fn tail(&self, layout: Layout) -> *const () {
        (self.vtable.tail)(self, layout)
    }
}
